package com.ctw.tinyservices.id.core.snowflake.holder;

import com.ctw.tinyservices.id.common.utils.LogUtils;
import com.ctw.tinyservices.id.core.snowflake.exception.CheckLastTimeException;
import com.ctw.tinyservices.id.core.snowflake.exception.NoUsefulWorkerIdException;
import com.ctw.tinyservices.id.core.snowflake.exception.ZookeeperConnectFailException;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author TongWei.Chen 2022/4/14 13:11
 *
 * WorkerId可回收 Zookeeper
 **/
public class RecyclableZookeeperIdSnowflakeHolder extends AbstractIdSnowflakeHolder {
    // 保存所有数据持久的节点的前缀
    private static final String PATH_FOREVER = "/tinyservices_id/snowflake/forever";
    // 未使用的workId池
    private static final String PATH_RECYCLE_NOTUSED = PATH_FOREVER + "/recycle/notused";
    // 正在使用的workId池
    private static final String PATH_RECYCLE_USED = PATH_FOREVER + "/recycle/used";
    // 最大的workId，默认是1023，用于确定未使用的workerId池中初始化workId的个数
    private long maxWorkerId;
    // maxUploadFailTime - 最大上报时间戳失败时间（默认为1小时）
    private long maxUploadFailTime;
    // 每1分钟检测一次所有workId上报情况
    private long checkWorkIdStatInterval;
    // 当长时间与zookeeper失去连接时，会将设置为false暂停id生成服务
    private boolean isAlive;
    // 当前节点名称
    private String currentNodeName;
    private String zkConnectionAddr;
    private CuratorFramework curator;

    private volatile static RecyclableZookeeperIdSnowflakeHolder singleton;
    public static RecyclableZookeeperIdSnowflakeHolder getInstance(Integer port, String zkConnectionAddr) {
        if (singleton == null) {
            synchronized (RecyclableZookeeperIdSnowflakeHolder.class) {
                if (singleton == null) {
                    singleton = new RecyclableZookeeperIdSnowflakeHolder(port, zkConnectionAddr);
                }
            }
        }
        return singleton;
    }

    private RecyclableZookeeperIdSnowflakeHolder(int port, String zkConnectionAddr) {
        super(port);
        this.maxWorkerId = 1023;
        this.maxUploadFailTime = 1000 * 60 * 60 * 1L;
        this.checkWorkIdStatInterval = 1L;
        this.isAlive = true;
        this.zkConnectionAddr = zkConnectionAddr;
    }

    @Override
    protected void doProcess() throws Exception {
        curator = createWithOptions(zkConnectionAddr, new RetryUntilElapsed(1000, 4), 5000, 6000);
        curator.start();
        Stat stat = curator.checkExists().forPath(PATH_RECYCLE_NOTUSED);
        if (stat == null) {
            initNotUseNode();
        }
        // 从未使用的workId池中取一个可用节点
        List<String> keys = curator.getChildren().forPath(PATH_RECYCLE_NOTUSED);
        if (null == keys || keys.size() == 0) {
            LogUtils.error(RecyclableZookeeperIdSnowflakeHolder.class, "no useful workerId in {} ", PATH_RECYCLE_NOTUSED);
            throw new NoUsefulWorkerIdException("no useful workerId in " + PATH_RECYCLE_NOTUSED);
        }

        currentNodeName = keys.get(0);
        byte[] bytes = curator.getData().forPath(PATH_RECYCLE_NOTUSED + "/" + currentNodeName);
        Endpoint endPoint = deBuildData(new String(bytes));
        if (endPoint != null) {
            //这个workId之前的生成过id的timeStamp比现在的时间大，抛出异常
            if (endPoint.getTimestamp() > System.currentTimeMillis()) {
                throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
            }
            workerId = endPoint.getWorkerId();
            // 将当前选取节点从未使用的workId池中删除
            curator.delete().forPath(PATH_RECYCLE_NOTUSED + "/" + currentNodeName);
            // 在正在使用的workId池(也就是/used/路径下)创建节点
            curator.create().
                    creatingParentsIfNeeded().
                    withMode(CreateMode.PERSISTENT).forPath(PATH_RECYCLE_USED + "/" + currentNodeName, bytes);
            LogUtils.info(RecyclableZookeeperIdSnowflakeHolder.class, "[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerId);
        } else {
            throw new NoUsefulWorkerIdException("no useful workerId in " + PATH_RECYCLE_NOTUSED);
        }
    }

    @Override
    protected void doUpdateMaxTimestamp() throws Exception {
        curator.setData().forPath(PATH_RECYCLE_USED + "/" + currentNodeName, buildData().getBytes(StandardCharsets.UTF_8));
    }

    @Override
    protected void doStartOtherProcess() {
        Executors.newSingleThreadScheduledExecutor(r -> {
            Thread thread = new Thread(r, "schedule-checkWorkerId-time");
            thread.setDaemon(true);
            return thread;
        }).scheduleWithFixedDelay(() -> checkWorkerIdUploadStat(), checkWorkIdStatInterval, checkWorkIdStatInterval, TimeUnit.MINUTES); // 每1分钟检测所有上报数据

    }

    /**
     *  更新这个节点中的时间戳信息，
     *  如果更新失败，并且已经超过maxUploadFailTime时候，那么这个节点会停止id生成服务，
     *  因为其他服务器会把这个workId从此workId从正在使用的workId池移除，
     *  并在未使用workId池中新增这个节点，继续使用可能会导致重复，所以停止id生成服务。
     */
    @Override
    protected void doUpdateMaxTimestampException() {
        if (System.currentTimeMillis() - lastUpdateTime > maxUploadFailTime){
            this.isAlive = false;
            throw new ZookeeperConnectFailException("lost connect to zookeeper over maxUploadFailTime");
        }
    }

    @Override
    public boolean isAlive() {
        return this.isAlive;
    }

    private void checkWorkerIdUploadStat() {
        try {
            // 从正在使用的workerId池下取出所有节点进行遍历判断
            List<String> keys = curator.getChildren().forPath(PATH_RECYCLE_USED);
            for (int i = 0; i < keys.size(); i++) {
                String nodeName = keys.get(i);
                byte[] bytes = curator.getData().forPath(PATH_RECYCLE_USED + "/" + nodeName);
                Endpoint endPoint = deBuildData(new String(bytes));
                //当某个workId的已经超过1小时+2分钟还没有更新的话，就进行从正在使用的workerId池移除，放入未使用workerId池
                if (null != endPoint
                        && System.currentTimeMillis() - endPoint.getTimestamp()
                        > maxUploadFailTime + 2 * checkWorkIdStatInterval) {
                    curator.delete().forPath(PATH_RECYCLE_USED + "/" + nodeName);
                    curator.create().
                            withMode(CreateMode.PERSISTENT).forPath(PATH_RECYCLE_NOTUSED + "/" + nodeName, buildData(endPoint.getWorkerId()).getBytes(StandardCharsets.UTF_8));
                    LogUtils.info(RecyclableZookeeperIdSnowflakeHolder.class, "remove workerId {} from inuse to notuse success", nodeName);
                }
            }
        } catch (Exception e) {
            LogUtils.error(RecyclableZookeeperIdSnowflakeHolder.class, "check workerId status error path is {} error is {}", PATH_RECYCLE_USED, e);
        }
    }

    /**
     * 与zookeeper建立连接
     * @return Endpoint
     */
    private CuratorFramework createWithOptions(String zkAddress, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
        return CuratorFrameworkFactory.builder().connectString(zkAddress)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                .build();
    }

    /**
     * 创建持久顺序节点, 并把节点数据放入 value
     *
     * @return
     * @throws Exception
     */
    private void initNotUseNode() throws Exception {
        for (int i = 0; i <= maxWorkerId; i++) {
            curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(PATH_RECYCLE_NOTUSED + "/workerId:" + i , buildData(i).getBytes(StandardCharsets.UTF_8));
            LogUtils.info(RecyclableZookeeperIdSnowflakeHolder.class, "{} has created", PATH_RECYCLE_NOTUSED + "/workerId:" + i);
        }
    }
}
